package mr

import (
	"context"
	"errors"
	"log"
	"sync"
	"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

// 任务状态
const (
	TaskNotDo  = iota // 限制
	TaskDoing         // 正在运行
	TaskCommit        //完成

)

type Master struct {
	// Your definitions here.
	files   []string
	nReduce int // 划分为几个reduce任务

	mapTasks    []int // map任务
	reduceTasks []int // reduce任务

	mapCount     int // 记录已经完成了几个map任务了
	mapDone      bool
	reduceCount  int
	reduceDone   bool
	workerCommit map[string]int // worker对应的状态
	allCommited  bool           // 是否所有任务都完成了

	timeout time.Duration
	mu      sync.Mutex
}

// Your code here -- RPC handlers for the worker to call.
func (m *Master) Work(args *WorkArgs, info *TaskInfo) error {
	// master分派任务前先上锁
	m.mu.Lock()
	defer m.mu.Unlock()

	// dispatch map work
	if !m.mapDone {
		err := m.dispatchMapWork(args, info)
		return err
	} else if m.mapDone && !m.reduceDone {
		err := m.dispatchReduceWork(args, info)
		return err
	}

	// 走到这一步，则查看是否所有的worker都完成了
	if !m.mapDone || !m.reduceDone {
		info.IsFinished = false
		return nil
	}

	info.IsFinished = true
	return errors.New("worker apply but no tasks to dispatch")

}

func (m *Master) dispatchMapWork(args *WorkArgs, info *TaskInfo) error {
	// 轮询所有的map任务，找到处于TaskNotDo状态的任务进行分配
	for i, file := range m.files {
		// 跳过正在处理或者已经处理完毕的任务
		if m.mapTasks[i] != TaskNotDo {
			continue
		}

		info.TaskId = i             // 当前任务id
		info.FileName = file        // 当前任务的文件名
		info.MapReduce = "map"      // 当前任务是map任务
		info.FileNumber = m.nReduce // 当前任务需要划分出几个输出文件
		info.IsFinished = false     // 当前任务是否完成

		m.workerCommit[args.WorkerId] = TaskDoing // 记录此时分派的worker的工作状态
		m.mapTasks[i] = TaskDoing                 // 记录此时分派的任务的状态

		// 设置超时时间
		// master在分配任务时，开启计时,如果worker超时仍然没有提交任务
		// 则认为该worker无法完成任务，并将任务重新分配给其他worker
		ctx, _ := context.WithTimeout(context.Background(), m.timeout)
		// 开启一个线程去监听超时
		go func() {
			select {
			// 此处会一直堵塞，直到Done发送一个消息过来
			case <-ctx.Done():
				{
					m.mu.Lock()
					defer m.mu.Unlock()
					// 若worker没有提交任务并且该map任务也没有提交，则说明超时
					if m.workerCommit[args.WorkerId] != TaskCommit && m.mapTasks[i] != TaskCommit {
						// 超时未完成，则把当前task设置为notdo状态，这样就会允许别的worker去处理它
						m.mapTasks[i] = TaskNotDo
					}
				}
			}
		}()
		return nil
	}
	return nil
}

func (m *Master) dispatchReduceWork(args *WorkArgs, info *TaskInfo) error {

	//分配 reduce work
	for i, v := range m.reduceTasks {
		if v != TaskNotDo {
			continue
		}
		info.TaskId = i
		info.FileName = "" // 过后生成
		info.MapReduce = "reduce"
		info.FileNumber = len(m.files) // map处理的文件数，因为要用这个数量来生成文件名
		info.IsFinished = false
		m.workerCommit[args.WorkerId] = TaskDoing
		m.reduceTasks[i] = TaskDoing

		ctx, _ := context.WithTimeout(context.Background(), m.timeout)
		go func() {
			select {
			case <-ctx.Done():
				{
					m.mu.Lock()
					defer m.mu.Unlock()
					if m.workerCommit[args.WorkerId] != TaskCommit && m.reduceTasks[i] != TaskCommit {
						m.reduceTasks[i] = TaskNotDo
					}
				}
			}
		}()
		return nil
	}

	return nil
}

func (m *Master) Commit(args *CommitInfo, reply *CommitReply) error {
	m.mu.Lock()
	defer m.mu.Unlock()
	switch args.MapReduce {
	case "map":
		{
			m.mapTasks[args.TaskId] = TaskCommit
			m.workerCommit[args.WorkerId] = TaskCommit
			m.mapCount++
			if m.mapCount == len(m.files) {
				m.mapDone = true
			}
		}
	case "reduce":
		{
			m.reduceTasks[args.TaskId] = TaskCommit
			m.workerCommit[args.WorkerId] = TaskCommit
			m.reduceCount++
			if m.reduceCount == m.nReduce {
				m.reduceDone = true
			}
		}
	}

	if m.mapDone && m.reduceDone {
		// 都完成了就把该标志符设置为true
		m.allCommited = true
	}

	return nil
}

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
	m.mu.Lock()
	defer m.mu.Unlock()
	return m.allCommited
}

//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
	// 初始化一个master
	m := Master{
		files:        files,
		nReduce:      nReduce,
		mapTasks:     make([]int, len(files)),
		reduceTasks:  make([]int, nReduce),
		workerCommit: make(map[string]int),
		mapDone:      false,
		reduceDone:   false,
		allCommited:  false,
		timeout:      10 * time.Second,
	}

	// Your code here.

	m.server()
	return &m
}
